LuCloud 7 BlockManagerMaster

LuCloud 7 BlockManagerMaster

cover

Architecture

Driver内含BlockManageMasterBlockManageMasterEndpoint,Endpoint存放若干BlockManageSlaveEndpoint的Ref,并通过此Ref利用BlockManageSlaveEndpoint实现Driver对BlockManage的管理。

Executor内含BlockManageMasterBlockManageSlaveEndpoint,以及若干BlockManage。Executor将BlockManage以及相应的BlockManageSlaveEndpoint注册到BlockManageMasterEndpoint中,并可以更新Endpoint中Block的消息、询问所需要Block所在的位置、在Executor结束时移除Executor等等。

arch

Details from code view

BlockManageMaster

Driver和Executor通过此类完成对BlockManage的管理,利用Master中的Transmission Tools将操作转化为消息发送到Endpoint中执行具体操作。

Transmission Tools

askaskSync函数都在RpcEndpointRef类中被实现,在BlockManageMaster中被使用,用于实现driver和executor到BlockManageMasterEndpointBlockManagerSlaveEndpoint的消息传输。

tell函数在BlockManagerMaster中被实现,也在BlockManageMaster中被使用,本质是askSync

ask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  /**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within the specified timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within a default timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

ask函数是传输工具中最基础的函数,使用了scala.concurrent.Future类完成消息的非阻塞式的异步处理:ask函数发送消息并返回Future(某个尚未就绪的值的对象),等待发送完成后Future才就位

由于笔者对Java/Scala异步处理不甚了解,这里不赘述。对Future类的理解详见Scala 中的异步事件处理Scala之Future

askSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
  /**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* default timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
* @param message the message to send
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @param timeout the timeout duration
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}

在以前的版本中同步请求功能由基于AkkaUtil.askWithReplyaskDriverWithReply函数实现;

新版本中阻塞式地基于ask函数使用awaitResult方法在timeout时间内等待ask的异步结果,保证在timeout时间内一定有结果返回,否则报错。这里本质是用异步实现同步,只要定时等待即可。

tell

1
2
3
4
5
6
/** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
private def tell(message: Any) {
if (!driverEndpoint.askSync[Boolean](message)) {
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
}
}

tell函数是对askSync函数的一种强化,它要求在我们将消息发送到指定Endpoint之后必须获得true的返回,否则抛出异常。

Functionality

主要操作函数包括:

  • removeExecutor:移出死亡的Executor;
  • registerBlockManager:注册BlockManage;
  • updateBlockInfo:更新BlockManage信息;
  • getLocations:获取给定blockId对应的BlockManage的位置;
  • getLocationsAndStatus:获取给定blockId对应的BlockManage的位置和状态;
  • contains:检查是否包含某BlockManage,利用getLocations实现;
  • getPeers:获取集群中其他节点中的BlockManage的ID(blockId);
  • getExecutorEndpointRef:获取某Executor对应的SlaveEndpoint的Ref;
  • removeBlock:移除给定blockId对应的BlockManage,首先在blockManagerInfo找到block,再利用其对应的SlaveEndpoint实现移除操作;
  • removeRdd:移除给定RDD中所有BlockManage;
  • removeShuffle:移除给定Shuffle中所有BlockManage;
  • removeBroadcast:移除给定Broadcast中所有BlockManage;
  • getMemoryStatusgetStorageStatus:获取存储信息;
  • getBlockStatus:获取BlockManage信息;
  • getMatchingBlockIds:搜索式查询blockId;
  • HasExclusiveCachedBlocks:查询executor是否包含某些BlockManage。

EndPoint

BlockManageMasterEndpoint

这里的指官方代码中的BlockManageMasterEndpoint.scala文件,其中包含了BlockManageMasterEndpointBlockManagerInfo两个类。BlockManageMasterEndpoint类为主体,BlockManagerInfo类描述BlockManage信息,并在BlockManageMasterEndpoint中被实例化。

BlockManageMasterEndpoint

BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses of all slaves’ block managers.

其在以前的版本中也叫BlockManageMasterActor(来源:《深入理解Spark核心思想——源码分析》)。

BlockManageMasterEndpoint只存在于Driver上,Executor在BlockManageMaster中获取BlockManageMasterEndpoint的引用,并向其发送消息(使用askaskSynctell),实现和Driver的交互。其源码内维护了很多缓存数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

// Mapping from external shuffle service block manager id to the block statuses.
private val blockStatusByShuffleService =
new mutable.HashMap[BlockManagerId, JHashMap[BlockId, BlockStatus]]

// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

类的主体函数为receiveAndReply,此函数作为匹配BlockManageMasterEndpoint接收到消息的偏函数,将接收到的消息和此类中的具体函数相匹配,完成类似于switch case的操作。

1
2
3
4
5
6
7
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
......
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
}

类中其余函数均为和消息相匹配的,实现注册、更新、获取消息等操作的函数,这里对其实现不做赘述。

BlockManagerInfo

描述BlockManage信息的类,BlockManage在BlockManageMasterEndpoint中以从BlockManageId到BlockManagerInfo的映射来存储,此类存储BlockManage的blockManagerId、slaveEndpoint等内容:

1
2
3
4
5
6
7
8
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
val maxOnHeapMem: Long,
val maxOffHeapMem: Long,
val slaveEndpoint: RpcEndpointRef,
val externalShuffleServiceBlockStatus: Option[JHashMap[BlockId, BlockStatus]])
extends Logging

BlockManagerSlaveEndpoint

An RpcEndpoint to take commands from the master to execute options. For example,this is used to remove blocks from the slave’s BlockManager.

SlaveEndpoint配合BlockManage执行一些来自于driver和executor的要求操作(通过BlockManageMaster),其主体函数同样是receiveAndReply,不过内部执行操作的选项较少,主要包括去除Block、RDD、Broadcast,获取信息等操作,所有匹配后的具体操作都是通过相应的具体类(如BlockManage、shuffleManager等)完成。

Others

  • 关于BlockManage、Shuffle、Broadcast的概念不甚了解,因此在文章的某些地方描述的有些含糊。希望等到对概念的理解更广泛时做一次refine。
  • 突然写BlockManageMaster主要是云计算最后一节课要求大家准备一份对Spark源码的阅读后的理解和分析,在课上草草地选了较为简单的BlockManageMaster部分,翻了翻源码,看了几篇讲得很含糊的文章,有了一些基础的理解。尽管在课上没有被叫起来分享,但是为了不浪费那几十分钟整理出来的内容,在此做一个更加详细的梳理。

Source